-
Notifications
You must be signed in to change notification settings - Fork 128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[New Connector] Jira connector - On Cloud and Server #527
Conversation
connectors/sources/jira.py
Outdated
if not self.ssl_disabled: | ||
self.ssl_ctx = self._ssl_context(certificate=self.certificate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not happen in ping
- it's not used in this function and get_docs
depends on it. You should initialise it in such a way, that both ping
and get_docs
could work if called individually, not sequentially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood your concern here. But, the connector is designed in such a way that ping
will always gets executed first and then the get_docs
method. So, there would not be a chance of the get_docs
method to be called individually since the framework itself is not allowing it. However, once we have a conclusion on this comment, we'll update the code accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not designed this way, it works this way now because it was the most convenient way to do. It might and will change in future, so we want to build connectors that are resilient to changes.
In short, you should treat ping
and get_docs()
as methods that can be called in any sequence OR even on their own, all these 3 cases are valid:
ping(); get_docs()
ping(); ping()
get_docs()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! got it. However this is related to this comment so do you want me to change this before getting the conclusion on it, or should wait for it? Till the time, i'll address other comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to address the topic in the comment in the mean time, I'll try to get back to you asap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, please. Do let me know your opinion on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping this to both the places i.e. ping
and get_docs
|
||
|
||
@pytest.mark.asyncio | ||
async def test_get_docs(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally you should test multiple cases with different data types returned here, it's your main function that you implement and it has only single test for a very basic scenario, can you make sure you test the method properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful if you provide some more idea which kind of test you want for this method? Since this test is covering multiple data types including project, issue and attachment.
Co-authored-by: Artem Shelkovnikov <lavatroublebubble@gmail.com>
Co-authored-by: Artem Shelkovnikov <lavatroublebubble@gmail.com>
@tarekziade did you get a chance to review this PR? If you've any feedbacks, I will address them with the Artem's one. |
basic_auth = aiohttp.BasicAuth( | ||
login=auth[0], | ||
password=auth[1], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this still work if self.is_cloud is True
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will work for both cloud and on-prem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just want to confirm, if self.is_cloud is True
, you are trying to use service_account_id
and api_token
to build a basic auth.
connectors/sources/jira.py
Outdated
yield item | ||
|
||
await self.fetchers.join() | ||
await asyncio.gather(project_task, attachment_task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this line needed? If you exit the infinite loop above, does it mean both project_task
and attachment_task
are done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this line runs the producers. Once, the await is finished, both the project_task
and attachment_task
are completed. The consumer loop is independent from this gather. The responsibility of this loop is to consume the documents added by the producer tasks. This consumer loop will only end once it gets the required number of FINISHED
in the queue which are eventually added by the producer tasks themselves. Hence, in a way, consumer and producer are communicating via FINISHED
signal.
buildkite test this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use the same pattern as in the confluence connector encapsulating and memoizing the client (instance).
f"Configured concurrent downloads can't be set more than {MAX_CONCURRENT_DOWNLOADS}." | ||
) | ||
|
||
def _generate_session(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would really like to see the same pattern we've used for the Confluence connector (see: https://github.com/elastic/connectors-python/pull/568/files), i.e. creating a JiraClient
class, which encapsulates the interaction with the server/cloud instance. Makes the connectors way more consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, however the testing cycle has been completed and it would take quite some efforts from the QA end to re-test it with new changes, so I strongly recommend we raise a GitHub issue and cover it as an enhancement PR. Let me know your thoughts regarding the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had concerns about how the get_docs
is implemented.
It implements a ConcurrentTasks object fetchers
to download attachments:
https://github.com/elastic/connectors-python/blob/1c1d6a747221581ba23a713470540a5759cd2527/connectors/sources/jira.py#L88
But I think this is not necessary. Connector implementation should just yield a lazy_download
(i.e. a callback), and the framewrok will handle the download at framework level:
https://github.com/elastic/connectors-python/blob/1c1d6a747221581ba23a713470540a5759cd2527/connectors/byoei.py#L298-L304
So I think what the get_docs
should do is just yield project
, issue
, and attachment
.
I also think it's overkill to use a queue as well. Having dedicated coroutine for project and issues, not a big deal, but I don't feel it's necessary. cc @tarekziade
I'll do a full review now
This technique speeds up the time taken by the connector to grab content. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a few nits. LGTM otherwise
connectors/sources/jira.py
Outdated
URLS = { | ||
PING: "/rest/api/2/myself", | ||
PROJECT: "/rest/api/2/project?expand=description,lead,url", | ||
ISSUES: "/rest/api/2/search?maxResults={maxResults}&startAt={startAt}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pep8 like others, so :
/rest/api/2/search?maxResults={max_results}&startAt={start_at}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it
connectors/sources/jira.py
Outdated
FETCH_SIZE = 100 | ||
CHUNK_SIZE = 1024 | ||
QUEUE_SIZE = 1024 | ||
QUEUE_MEM_SIZE = 5 # Size in Megabytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's MiB, add 1024*1024 here
connectors/sources/jira.py
Outdated
self.session = None | ||
self.tasks = 0 | ||
self.queue = MemQueue( | ||
maxsize=QUEUE_SIZE, maxmemsize=QUEUE_MEM_SIZE * 1024 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my previous comment about MiB
connectors/sources/jira.py
Outdated
should_paginate = True | ||
while should_paginate: | ||
async for response in self._api_call( | ||
url_name=url_name, startAt=start_at, maxResults=FETCH_SIZE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pep8
connectors/sources/jira.py
Outdated
|
||
attachment_name = attachment["filename"] | ||
if os.path.splitext(attachment_name)[-1] not in TIKA_SUPPORTED_FILETYPES: | ||
logger.debug(f"{attachment_name} is not supported by TIKA, skipping") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.warning
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can put a warning logger instead.
"_timestamp": attachment["created"], | ||
} | ||
temp_filename = "" | ||
attachment_url = ATTACHMENT_CLOUD if self.is_cloud else ATTACHMENT_SERVER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @artem-shelkovnikov this is where #534 would be useful
connectors/sources/jira.py
Outdated
async with aiofiles.open(file=temp_filename, mode="r") as async_buffer: | ||
# base64 on macOS will add a EOL, so we strip() here | ||
document["_attachment"] = (await async_buffer.read()).strip() | ||
await remove(temp_filename) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's try except this with just a warning if remove failed
attachments=attachments, issue_key=issue_key | ||
): | ||
await self.queue.put( | ||
( # pyright: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does pyright says here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document_task = asyncio.create_task(_document_task()) | ||
self.tasks += 1 | ||
|
||
# Consumer block to grab items from queue in a loop and yield one at a time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
FYI @tarekziade , after adding concurrency using |
The way it works:
So if you are raising it means you are filing the queue super fast and the rest of the pipeline can't follow. Try this:
(no need to limit the number of items to 1024, I noticed that you added that limit, you probably hit this first) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just a few nits to address
connectors/sources/jira.py
Outdated
url_name=url_name, start_at=start_at, max_results=FETCH_SIZE | ||
): | ||
response_json = await response.json() | ||
total = response_json.get("total") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if total
does not exists you get None
and the next test will fail.
if you always get total
and it's an int, change this to:
total = response_json["total"]
if not, make that code more robust:
total = int(response_json.get("total", 0))
logger.debug(f"Calling convert_to_b64 for file : {attachment_name}") | ||
await asyncio.to_thread(convert_to_b64, source=temp_filename) | ||
async with aiofiles.open(file=temp_filename, mode="r") as async_buffer: | ||
# base64 on macOS will add a EOL, so we strip() here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add an issue about this so we have the same behavior for convert_to_b64
on all platform
yield { | ||
"_id": f"{project['name']}-{project['id']}", | ||
"_timestamp": iso_utc( | ||
when=datetime.now(pytz.timezone(self.timezone)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you feed when
? it looks like the default value as it's done in iso_utc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since, we're indexing project every time with _timestamp with iso_utc
and other jira documents are using the timestamp getting from the response. So, if Jira server is hosted in different timezone other than UTC, user will face timestamp mismatch for projects and other objects in term of timezone. So, keep it consitent, we're feeding when
to the iso_utc
of the timezone of Jira server.
partial( | ||
self.get_content, | ||
issue_key=issue_key, | ||
attachment=copy(attachment), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why copy
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, attachment is a dictionary so any changes in the partial
should not be reflected in the below code. Hence, we passed the copy
to avoid changes done by the partial in the passed param.
certificate = certificate.replace(" ", "\n") | ||
certificate = " ".join(certificate.split("\n", 1)) | ||
certificate = " ".join(certificate.rsplit("\n", 1)) | ||
certificate = get_pem_format(certificate, max_split=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved, pending one more approval since it's a big patch, +my nits to address
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
💔 Failed to create backport PR(s)
To backport manually run: |
PR Content
Jira connector with the support for both on-cloud and on-prem platforms.
Checklists
New Dependency